5.2 channel的底层实现

通道在并发编程中是比较常用的一个内容,在我们进行实际业务开发时会大量使用到。

本节我们将针对通道的底层实现原理进行探讨学习,包括底层的结构、处理机制等内容。

本节代码存放目录为 lesson14

通道的基本结构

我们可以通过源码包 runtime/chan.go 找到关于通道的源码,其中详细定义了通道的结构以及相关的实现。

结构如下所示:

type hchan struct {
    qcount   uint           // 通道中的元素个数
    dataqsiz uint           // 环形队列的大小(缓冲区大小)
    buf      unsafe.Pointer // 指向底层缓冲区的指针
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识通道是否已关闭

    sendx    uint           // 下一个发送元素的索引
    recvx    uint           // 下一个接收元素的索引

    recvq    waitq          // 接收者等待队列
    sendq    waitq          // 发送者等待队列

    lock     mutex          // 互斥锁,保护通道的并发操作
}

在上面的结构中,我们可以主要关注bugrecvqsendq这三个内容。

buf

这是指向底层缓冲区的指针,主要用于有缓冲通道,无缓冲通道是用不到这个东西的。

也就是说,对于有缓冲通道,数据发送后就会首先进入到buf中,当读取数据时也是从buf中读取。

例如我们做如下声明:

ch := make(chan int, 3)

那么在底层buf的实际结构可能是这样的:

+----+----+----+
| 1  | 2  |    |   <- 这是缓冲区
+----+----+----+

sendx这时候可能指向第三个空位置,表示下一个数据将存放在这里。

recvx可能指向第一个位置,表示下一个读取操作将从这里获取数据。

sendxrecvx到达缓冲区的末尾时,它们会环绕回到数组的起始位置 (这就是环形队列的特性)。


recvq 与 sendq

这两者都是一个队列,准确的说是一个接收任务队列与一个发送任务队列。

当一个Goroutine尝试从通道接收数据,但通道(无缓冲或缓冲区为空)没有可用的数据时,该Goroutine会被阻塞,并被放入recvq队列中。

一旦通道中有数据可用,recvq中最早阻塞的Goroutine会被唤醒,从通道中接收数据并继续执行。

当一个Goroutine尝试向通道发送数据,但通道(无缓冲或缓冲区已满)没有空闲空间时,该Goroutine会被阻塞,并被放入sendq队列中。

一旦通道有空闲的空间(例如,接收操作消耗了一个缓冲区数据),sendq中最早阻塞的Goroutine会被唤醒,继续发送数据。

我们可以通过下面的例子来查看:

noBufChan := make(chan int)

wg.Add(1)
go func() {
    defer wg.Done()
    fmt.Println("enter goroutine receive 1")
    for ch := range noBufChan {
        fmt.Printf("noBufChan receive 1 -> %d\n", ch)
    }
}()

time.Sleep(time.Duration(1) * time.Second)

wg.Add(1)
go func() {
    defer wg.Done()
    fmt.Println("enter goroutine receive 2")
    for ch := range noBufChan {
        fmt.Printf("noBufChan receive 2 -> %d\n", ch)
    }
}()

time.Sleep(time.Duration(1) * time.Second)

wg.Add(1)
go func() {
    defer wg.Done()
    fmt.Println("enter goroutine send 1")
    time.Sleep(time.Duration(3) * time.Second)
    for i := 0; i < 100; i++ {
        noBufChan <- i
        fmt.Printf("noBufChan send -> %d\n", i)
        time.Sleep(time.Duration(1) * time.Second)
    }
}()

wg.Wait()

结果输出如下所示:

enter goroutine receive 1
enter goroutine receive 2
enter goroutine send 1
noBufChan receive 1 -> 0
noBufChan send -> 0
noBufChan send -> 1
noBufChan receive 2 -> 1
noBufChan send -> 2
noBufChan receive 1 -> 2

在上面的代码中,我们先使用receive 1receive 2读取数据,这时候通道里面是没有数据的,所以会阻塞。

等到send开始执行时,读取协程开始恢复接收,读取到发送的数据。

通道关闭机制及性能优化

关闭机制

通道的关闭会经过一系列的资源释放及状态标记,我们可以通过源码包 runtime/chan.go 中的closechan函数查看具体实现。

主要分为以下几个步骤:

  1. 检查通道是否为nil

    if c == nil {
        panic(plainError("close of nil channel"))
    }
    

    首先检查通道是否为nil,如果是nil通道,调用panic抛出错误。

  2. 获取通道的锁

    lock(&c.lock)
    

    Go语言的通道是线程安全的,使用锁来确保多个Goroutine同时操作通道时不会发生竞态条件。

  3. 检查通道是否已经关闭

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    

    检查通道是否已经关闭。如果通道已经关闭,再次关闭时会触发panic

  4. 竞态条件检测

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
        racerelease(c.raceaddr())
    }
    

    这里的raceenabled是竞态条件检测相关的逻辑。racewritepcracerelease用于检测并处理竞态条件,确保在竞态检测开启的情况下能够捕捉到相关问题。

  5. 标记通道已关闭

    c.closed = 1
    

    将通道的closed字段设为1,表示通道已经关闭。

  6. 处理接收者队列

    var glist gList
    
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    

    遍历通道的接收者队列recvq,将所有等待接收数据的Goroutine释放。

    对于每一个在接收队列中的Goroutine,如果它的elem字段非nil,会将其清空,并将Goroutine标记为未成功接收数据。

    最后将这些Goroutine加入到glist中。

  7. 处理发送者队列

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    

    类似地,遍历发送者队列sendq,将所有等待发送数据的Goroutine释放,并将其加入到glist中。

    由于通道关闭后不能再发送数据,所有在发送队列中的Goroutine将会panic

  8. 释放锁

    unlock(&c.lock)
    

    完成通道关闭操作后,释放通道的锁。

  9. 唤醒所有阻塞的Goroutine

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
    

    最后,唤醒glist中所有的Goroutine,即唤醒那些被阻塞在接收或发送操作上的Goroutine,这些Goroutine会检测通道已经关闭,并采取相应的操作(如返回零值或panic)。


性能优化与通道使用建议

上文中我们已经对通道的结构及原理进行了一个大致的了解,在日常使用的时候我们也需要注意一些内容。

  • 使用缓冲通道提高性能:在高并发场景下,适当的缓冲区大小可以减少Goroutine的阻塞次数,从而提高性能。

  • 避免滥用无缓冲通道:无缓冲通道会导致更多的阻塞和上下文切换,可能导致性能问题,尤其是在大量数据传输时,一般来说无缓冲通道我们都是用在一些流程控制上。

小结

本节我们讲解了通道的底层结构以及使用建议。

关于本节总结如下:

  • 底层通过buf实现缓冲通道

  • 通过发送队列、接收队列实现堵塞

  • 关闭通道时释放一系列资源、唤醒读取堵塞队列

results matching ""

    No results matching ""